AI项目实战(7)数据分析 Agent

《AI Agent 实战》系列 · 数据分析 Agent

Posted by Ryan on 2026-06-29
Estimated Reading Time 29 Minutes
Words 6.8k In Total
Viewed Times

全书主线进度:前三个项目,Agent 学会了"用工具"“查资料”“多轮审查”。可有个工程难题一直没解——工具对接。你想让 Agent 连公司的数据库、连 BI、连 API 网关……难道每个系统都得手写一套胶水代码?写完五套,第六套又得从头来。本章就来攻 MCP(模型上下文协议)——给 Agent 装一个"USB 口",数据源即插即用。

主讲能力:MCP(模型上下文协议)+ 代码执行沙箱
业务场景:用户用自然语言描述分析需求,Agent 自动查表结构 → 写 SQL → 执行 Python 分析 → 输出报告。
技术栈:LangChain create_agent + SQLite 数据库 + Python 代码沙箱 + 准备接入 MCP


6.1 老李的痛点

6.1.1 业务背景与痛点

先讲个真事。

老李是公司的运营,每周五下午雷打不动要交一份数据周报:上周哪个品类卖得最好、哪个区域在掉量、爆款该不该补货。活儿不复杂,可老李不会写 SQL,Excel 透视表也玩不溜。他的套路是:把需求整理成一段话,微信发给数据分析师小王,然后——等。

小王手里排着七八个需求,老李这份周报,顺利的话等一天,不顺利等三天。有一回老板临时开会要个数,偏偏小王请假,老李对着空荡荡的数据看板,急得想原地转行。

这种事,每个公司都在上演。痛点说白了就一句话:

会问问题的人不会取数,会取数的人排不上队。

那如果,让 Agent 直接连上数据库呢?老李只要说一句"上个月哪个产品卖得最好",它自己去查表结构、自己写 SQL、自己跑分析、自己出报告——中间不排队、不等人。门槛,直接降到零。

这就是项目四要干的事。

6.1.2 用户故事

编号 作为 我想要 以便
US-1 运营 用自然语言问数据问题,AI 自动查数据库 不用学 SQL
US-2 数据分析师 AI 先帮我写 SQL 跑数据,我再决定要不要深入分析 省时间
US-3 安全工程师 数据查询限制在 SELECT,不能修改数据 防止误操作

6.1.3 功能性需求

  • FR-1 表结构发现:Agent 能自动获取数据库表结构
  • FR-2 自然语言 → SQL:Agent 将用户问题转换为 SQL 查询
  • FR-3 安全限制:只允许 SELECT,禁止 INSERT/UPDATE/DELETE/DROP
  • FR-4 Python 分析:Agent 在安全沙箱中执行 Python 分析代码
  • FR-5 结果解释:用中文向用户解释分析结果

6.2 画个样子:它该长啥样

%%{init: {'theme':'base','flowchart':{'useMaxWidth':true,'htmlLabels':true}}}%%
graph TD
    U["用户:上个月哪个产品卖得最好?"] --> Agent["数据分析 Agent"]
    Agent --> T1["get_table_schema
了解表结构"] T1 --> T2["query_database
执行 SELECT 查询"] T2 --> T3["execute_python
安全沙箱分析"] T3 --> Report["自然语言分析报告"] classDef uN fill:#e3f2fd,stroke:#1976d2,stroke-width:2px,color:#0d47a1 classDef aN fill:#ede7f6,stroke:#5e35b1,stroke-width:2px,color:#311b92 classDef tN fill:#e0f7fa,stroke:#00acc1,stroke-width:2px,color:#006064 classDef rN fill:#e8f5e9,stroke:#43a047,stroke-width:2px,color:#1b5e20 class U uN class Agent aN class T1,T2,T3 tN class Report rN

把这张图横过来看,Agent 干的活拆开就四步:先摸清表长啥样,再写 SQL 取数,接着用 Python 算一算,最后说人话汇报。

注意,这四步的顺序不是写死在代码里的 if-else,而是 Agent 自己"想"出来的——这就是一种轻量的任务规划:模型读到你那句"上个月哪个产品卖得最好",自己盘算"我得先知道有哪些表、列叫啥,才能写 SQL",于是先调 get_table_schema。本项目没有单独的"规划器"类,规划就藏在 create_agent 的工具调用循环里:模型每一步根据上一步的结果,决定下一步该喊哪个工具。


6.3 拆开看:怎么造出来

6.3.1 技术选型

技术点 选型 理由
Agent create_agent 全链路统一
数据库 SQLite(演示数据) 零配置,可替换为 PostgreSQL
SQL 安全 关键词白名单 只允许 SELECT,拒绝 DROP/DELETE/INSERT/UPDATE
Python 沙箱 受限 __builtins__ + 关键词检测 禁止 import os/subprocess 等危险模块
MCP 准备 架构预留 MCP Server 接口 生产环境可改为通过 MCP 协议接入真实数据库

选型表里有一行特别值得咂摸——“MCP 准备”。注意是"准备",不是"已完成"。这点很重要,后面会老实交代。

6.3.2 关键设计:SQL 安全

1
2
3
4
dangerous = ["DROP", "DELETE", "INSERT", "UPDATE", "ALTER", "CREATE"]
for kw in dangerous:
if kw in sql_upper:
return f"错误:不允许执行包含 {kw} 的 SQL 语句。"

看到这段,你心里可能冒出一个问题——

“既然模型会写 SQL,那干嘛不直接给它一个数据库连接串,让它写完就跑?多干脆。”

这是个好问题,也是这一章第一个反直觉点。咱们走一遍心路历程。

第一反应,确实,直接连最省事。但冷静两秒,三个鬼就冒出来了:

第一个鬼,幻觉。模型今天心情好给你写 SELECT,明天抽风就可能写出 DROP TABLE sales;。一句 SQL 下去,演示库没了是小事,万一连的是生产库呢?老李的周报没出来,老板的订单先没了。

第二个鬼,越权。就算不 DROP,模型也可能 UPDATEDELETE,把别人的数据改了。数据分析本该是只读的,凭什么给它写权限?

第三个鬼,复用。每个 Agent 都自己撸一套连库代码,N 个 Agent × M 个数据源,就是 N×M 套胶水。改一次密码,全员跟着改。

所以第一道闸,就是在执行前拦一道:只放行 SELECT,危险关键词一律挡下。上面这段就是 validate_sql() 的核心思路——简单粗暴,但有效。

💡 顿悟时刻:安全检查的顺序有讲究。先判"是不是 SELECT 开头",再扫"有没有危险关键词"。这两道闸一前一后,把绝大多数越权操作挡在执行之前,根本不碰数据库。安全的本质,是让危险连数据库的门都摸不到

6.3.3 关键设计:Python 沙箱

1
2
3
4
forbidden = ["import os", "import subprocess", "eval(", "exec(", "open("]
for fb in forbidden:
if fb in code:
return f"错误:代码包含禁止的操作({fb}),已被沙箱拦截。"

光会查 SQL 还不够。有时候算个同比增长、做个分组汇总,SQL 写起来又长又丑,不如直接跑一段 Python。于是有了 execute_python 工具——让模型写代码、在沙箱里跑。

⚠️ 避坑(重点):代码执行的安全边界

这是本章最容易踩雷的地方,必须讲透。

模型生成的代码,默认不可信。它完全可能"好心"地写出:

1
2
import os
os.system("rm -rf /") # 一行代码,天台见

所以沙箱要拦三样东西:

  1. 危险模块ossubprocesssocketrequests……能碰系统、能联网的,统统禁。
  2. 危险函数evalexecopencompile__import__……能动态执行、能读写文件的,禁。
  3. 危险内置:把 __builtins__ 换成一份白名单,只留 printlensum 这些人畜无害的。

但这里必须说句大实话——

字符串匹配的沙箱,挡得住君子,挡不住高手。

上面的 forbidden 列表是子串匹配,import os 能拦,可 import os(多一个空格)、__import__("os")getattr(__builtins__, "ev"+"al") 这些花活儿,子串匹配就未必拦得住。本项目的沙箱是教学级的,演示"防御该有哪些层"这个思想;真要上生产,光靠字符串拦是不够的,得上进程/容器隔离(Docker、firejail、或干脆把代码扔到远程独立环境执行),让恶意代码就算逃逸也碰不到宿主机。沙箱不是用来证明代码安全,而是用来假设代码不安全后,把爆炸半径圈到最小。

💡 文档里的代码清单是教学简化版。仓库 backend/projects/p04_data_analysis/tools.py 的实际实现比清单更硬:用正则匹配多种 import 写法(import osfrom os import 都能拦),用 signal.alarm 给代码执行加超时(防 while True 死循环),还额外提供 list_available_tablesget_available_tools。但即便如此,它依然是进程内的教学沙箱,不是生产级隔离——这点别误会。

6.3.4 一道反直觉题:既然能直接连库,为什么还要 MCP?

这是本章的核心问题,也是"准备接入 MCP"那行选型背后真正想说的事。

再回到那个朴素的想法:Agent 要连数据库,写个 sqlite3.connect() 不就完了?对,单个项目、单个数据库,确实完了。可把镜头拉远——

你公司有 3 个 Agent(数据分析、客服、运营),要接 4 个数据源(MySQL、PostgreSQL、内部 BI、第三方 API)。每个 Agent 给每个数据源写一套对接代码,就是 3×4 = 12 套。再加一个 Agent?变 16 套。再加一个数据源?变 20 套。这就是传说中的 N×M 对接噩梦

这不是新问题。二十年前电脑也这样:鼠标是圆口、键盘是 PS/2、打印机是并口、相机是火线……N 个设备 × M 台电脑,抽屉里塞满互不兼容的线缆。后来有了 USB,一夜之间,所有设备一个口、所有电脑一个口,N×M 变成了 N+M

MCP 之于 Agent,就是 USB 之于电脑。

MCP(Model Context Protocol,模型上下文协议)干的事,就是给"Agent"和"数据源/工具"之间定一个标准插头。数据源只要实现一次 MCP Server(把自己的能力按标准暴露成 tools/resources),任何支持 MCP 的 Agent 都能即插即用;Agent 只要会读 MCP,就能接天下所有数据源。N×M 的胶水,塌缩成 N+M 的两份适配。

所以本项目的 mcp.py,命名不是随便起的——它是按 MCP 的接口形态预留的数据访问层。execute_queryget_table_schemalist_tables 这几个函数,签名干净、无副作用,正好能一一映射成 MCP 的 tools。把它们包成真正的 MCP Server(用官方 SDK 注册 tools、跑 stdio/SSE 通信),就是生产演进的方向。

⚠️ 老实交代一句:教学版里,Agent 是直接把这些函数当普通 Python 工具调用的,并没有真正用 MCP 的 JSON-RPC 协议把它们暴露成 Server。换句话说,本章把"标准化数据访问"这层想清楚了、代码也按 MCP 的样子摆好了,但协议层的那一步还没迈出去——这叫"准备接入",不叫"已接入"。别看了章节标题就以为本项目已经跑起一个真 MCP Server,那是下一阶段的事。本章的价值,是让你先把"为什么要标准化"这件事,从骨头里想明白。

金句:标准化的价值,不在今天少写一行,而在明天少改一百行。


6.4 动手写:三层架构完整代码

6.4.1 架构分层总览

层级 文件名 职责 核心类/函数
领域模型层 models.py 查询结果、表结构、分析报告的数据结构 QueryResult, TableSchema, AnalysisReport
提示词层 prompts.py 系统提示词、SQL 编写指南、报告模板 build_system_prompt(), build_report_prompt()
MCP 数据访问层 mcp.py 数据库连接、初始化、安全查询执行 execute_query(), get_table_schema(), validate_sql()
工具层 tools.py 封装给 Agent 调用的工具函数 query_database(), execute_python()
服务层 service.py 快捷分析方法、查询缓存、报告生成 DataAnalysisService
项目入口层 project.py Agent 构建、项目注册 DataAnalysisProject

这张表是本章的地图。从下往上看:models 定数据结构,prompts 管提示词,mcp 专门管"怎么安全地碰数据库",tools 把能力包成 Agent 能调的工具,service 沉淀常用业务逻辑,project 负责把 Agent 装配起来注册上线。一层一层,各司其职。

💡 下面六个小节的代码清单是教学简化版,聚焦讲清每层的核心思路。仓库 backend/projects/p04_data_analysis/ 里的真实代码更完整(多了 init_database、正则沙箱、超时控制、缓存清理等),但骨架和下面一致。看懂清单,就能看懂仓库。


6.4.2 models.py - 领域模型层

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
"""P04 数据分析 Agent - 领域模型层"""
from __future__ import annotations

import json
from dataclasses import asdict, dataclass, field
from datetime import datetime
from enum import Enum
from typing import Any


class QueryStatus(str, Enum):
"""查询执行状态。"""
SUCCESS = "success"
ERROR = "error"
TIMEOUT = "timeout"
FORBIDDEN = "forbidden"


@dataclass
class ColumnInfo:
"""数据库列信息。"""
name: str
type: str
nullable: bool = True
description: str = ""

def to_dict(self) -> dict[str, Any]:
return asdict(self)


@dataclass
class TableSchema:
"""数据库表结构信息。"""
table_name: str
columns: list[ColumnInfo] = field(default_factory=list)
row_count: int = 0
sample_rows: list[dict] = field(default_factory=list)

def get_column_names(self) -> list[str]:
return [c.name for c in self.columns]

def to_markdown(self) -> str:
"""转换为 Markdown 表格格式。"""
lines = [f"### 表 {self.table_name}", ""]
lines.append(f"**总行数**: {self.row_count}")
lines.append("")
lines.append("| 列名 | 类型 | 说明 |")
lines.append("|------|------|------|")
for col in self.columns:
lines.append(f"| {col.name} | {col.type} | {col.description or '-'} |")
return "\n".join(lines)


@dataclass
class QueryResult:
"""SQL 查询结果。"""
sql: str
status: QueryStatus
columns: list[str] = field(default_factory=list)
rows: list[list[Any]] = field(default_factory=list)
row_count: int = 0
error_message: str = ""
execution_time: float = 0.0
executed_at: str = field(default_factory=lambda: datetime.now().isoformat())

@property
def success(self) -> bool:
return self.status == QueryStatus.SUCCESS

def to_markdown(self, max_rows: int = 20) -> str:
if not self.success:
return f"**查询失败**: {self.error_message}"
if not self.rows:
return "查询结果为空。"
lines = [f"**查询结果**(共 {self.row_count} 行):", ""]
lines.append("| " + " | ".join(self.columns) + " |")
lines.append("|" + "|".join(["------"] * len(self.columns)) + "|")
for row in self.rows[:max_rows]:
values = [str(v) if v is not None else "NULL" for v in row]
lines.append("| " + " | ".join(values) + " |")
if len(self.rows) > max_rows:
lines.append(f"\n... (显示前 {max_rows} 行)")
return "\n".join(lines)

核心代码讲解

  • 富数据结构QueryResult 不仅包含数据,还包含执行状态、耗时、错误信息——查成功了还是被拦了,一眼分明。
  • 多格式输出to_markdown() 让查询结果能直接展示给用户,无需额外转换。
  • Schema 自描述TableSchema 包含列信息和示例数据,Agent 拿到它就能更准确地生成 SQL,不用靠猜。
  • 状态枚举QueryStatus 把"成功/出错/超时/被禁"四种结局分开,Agent 能据此决定是换个 SQL 重试,还是直接告诉用户"这操作不允许"。

💡 清单里没列全。仓库的 models.py 还定义了 AnalysisReport(完整分析报告)和 PythonExecutionResult(代码执行结果)两个数据类,给 service.py 生成报告用。


6.4.3 mcp.py - MCP 数据访问层

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
"""P04 数据分析 Agent - MCP 数据访问层"""
from __future__ import annotations

import sqlite3
import time
from contextlib import contextmanager
from pathlib import Path

from core.config import DATA_DIR
from core.logging_conf import get_logger

from .models import ColumnInfo, QueryResult, QueryStatus, TableSchema

logger = get_logger("p04.data_analysis.mcp")
DB_PATH = DATA_DIR / "analysis.db"

FORBIDDEN_KEYWORDS = {"INSERT", "UPDATE", "DELETE", "DROP", "ALTER",
"CREATE", "TRUNCATE", "GRANT", "REVOKE"}


@contextmanager
def get_db_connection() -> sqlite3.Connection:
"""获取数据库连接(上下文管理器,自动关闭)。"""
conn = sqlite3.connect(str(DB_PATH), timeout=10.0)
conn.row_factory = sqlite3.Row
try:
yield conn
finally:
conn.close()


def validate_sql(sql: str) -> tuple[bool, str]:
"""验证 SQL 安全性。"""
sql_upper = sql.strip().upper()

# 必须以 SELECT 开头
if not sql_upper.startswith("SELECT"):
return False, "只允许执行 SELECT 查询"

# 检查禁止的关键字
for keyword in FORBIDDEN_KEYWORDS:
if keyword in sql_upper:
return False, f"不允许执行包含 {keyword} 的 SQL 语句"

return True, ""


def execute_query(sql: str, max_rows: int = 500) -> QueryResult:
"""安全执行 SQL 查询。"""
start_time = time.time()
logger.info("执行 SQL 查询: %s", sql[:200])

is_safe, error_msg = validate_sql(sql)
if not is_safe:
return QueryResult(
sql=sql,
status=QueryStatus.FORBIDDEN,
error_message=error_msg,
)

try:
with get_db_connection() as conn:
cur = conn.execute(sql)
columns = [desc[0] for desc in cur.description]
rows = [list(row) for row in cur.fetchmany(max_rows)]

return QueryResult(
sql=sql,
status=QueryStatus.SUCCESS,
columns=columns,
rows=rows,
row_count=len(rows),
execution_time=time.time() - start_time,
)
except sqlite3.Error as e:
return QueryResult(
sql=sql,
status=QueryStatus.ERROR,
error_message=f"SQL 执行错误: {e}",
execution_time=time.time() - start_time,
)


def get_table_schema(table_name: str = "sales") -> TableSchema:
"""获取数据库表结构信息。"""
with get_db_connection() as conn:
pragma = conn.execute(f"PRAGMA table_info({table_name})").fetchall()
columns = [ColumnInfo(name=col["name"], type=col["type"]) for col in pragma]
count = conn.execute(f"SELECT COUNT(*) FROM {table_name}").fetchone()[0]

return TableSchema(
table_name=table_name,
columns=columns,
row_count=count,
)

核心代码讲解

  • 安全第一validate_sql() 在执行前做双重检查(SELECT 开头 + 禁止关键字),危险 SQL 连 conn.execute 都到不了。
  • 资源管理@contextmanager + with 语句确保连接自动关闭,查完即散,不留泄漏。
  • 错误分级QueryStatus 区分 FORBIDDEN(被安全策略拦下)、ERROR(数据库报错)、TIMEOUT,便于 Agent 采取不同策略——被拦了就别重试同样的 SQL,报错了可以修一修再来。
  • MCP 兼容execute_queryget_table_schema 这几个函数签名干净、无副作用,正好能一一映射成 MCP 的 tools。这就是"按 MCP 接口形态预留"的意思。

⚠️ 关于"MCP"这个名字,再说清楚一次:这个文件叫 mcp.py,但它不是一个真正的 MCP Server——没有 MCP SDK,没有 JSON-RPC,没有 stdio/SSE 通信。它只是一个形状像 MCP tools 的数据访问层。教学版里 Agent 直接 from .mcp import execute_query 当普通函数调;真正变成 MCP Server,是用官方 SDK 把这些函数注册成 tools、起一个 stdio 进程让外部 Agent 来连。这步本章没做,是后续演进。这里诚实标注,免得你对着文件名产生误会。

💡 清单里没列全。仓库的 mcp.py 还提供 list_tables()(列出所有表)和 init_database()(首次运行自动建表插演示数据),validate_sql 还多挡了 PRAGMA/ATTACH/LOAD_EXTENSION 等系统级操作,execute_query 还做了结果超 500 行截断。


6.4.4 tools.py - 工具层

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
"""P04 数据分析 Agent - 工具层"""
from __future__ import annotations

import io
import sys
import time

from langchain.tools import tool

from core.logging_conf import get_logger

from .mcp import execute_query, get_table_schema

logger = get_logger("p04.data_analysis.tools")
FORBIDDEN_IMPORTS = ["os", "subprocess", "sys", "shutil", "socket", "requests"]
FORBIDDEN_FUNCTIONS = ["eval(", "exec(", "open(", "compile(", "__import__"]


@tool
def query_database(sql: str) -> str:
"""对分析数据库执行 SQL 查询。

注意:只允许 SELECT 查询,禁止任何修改数据的操作。
"""
result = execute_query(sql)
return result.to_markdown()


@tool
def get_table_schema_info(table_name: str = "sales") -> str:
"""获取数据库表结构信息(列名、类型、示例数据)。

在编写 SQL 之前,**必须先调用此工具**了解表结构!
"""
try:
schema = get_table_schema(table_name)
return schema.to_markdown()
except Exception as e:
return f"获取表结构失败: {e}"


@tool
def execute_python(code: str) -> str:
"""在安全沙箱中执行 Python 代码,用于数据分析。"""
# 沙箱检查
for forbidden in FORBIDDEN_IMPORTS:
if f"import {forbidden}" in code or f"from {forbidden}" in code:
return f"错误:禁止导入 {forbidden} 模块"

for forbidden in FORBIDDEN_FUNCTIONS:
if forbidden in code:
return f"错误:禁止使用 {forbidden.replace('(', '')} 函数"

# 重定向 stdout
old_stdout = sys.stdout
sys.stdout = captured_output = io.StringIO()

try:
# 预导入安全的常用库
exec_globals = {"__builtins__": {
"print": print, "len": len, "range": range,
"list": list, "dict": dict, "str": str,
"int": int, "float": float, "sum": sum,
"max": max, "min": min, "sorted": sorted,
}}
exec("import math\nimport statistics\nimport json", exec_globals)
exec(code, exec_globals)

output = captured_output.getvalue()
if len(output) > 5000:
output = output[:5000] + "\n... (输出过长,已截断)"
return output if output else "代码执行成功,无输出。"
except Exception as e:
return f"代码执行出错: {type(e).__name__}: {e}"
finally:
sys.stdout = old_stdout

核心代码讲解

  • 工具友好:每个工具都有清晰的文档字符串(@tool 装饰器会把它喂给模型),Agent 能准确理解用途和限制。注意 get_table_schema_info 的 docstring 里那句"必须先调用此工具"——这是在用提示词引导模型的规划顺序。
  • 防御性编程execute_python() 实现多层沙箱防护——先扫禁止 import、再扫禁止函数、最后把 __builtins__ 换成白名单,三道闸。
  • 输出优化:返回 Markdown 格式,Agent 可以直接展示给用户;超长输出自动截断,防止一段 print 把上下文窗口撑爆。
  • stdout 捕获:把 sys.stdout 重定向到 StringIO,用户代码里 print 的内容就能被捕获返回。

⚠️ 避坑(再强调):清单里的沙箱用的是子串匹配if f"import {forbidden}" in code)。这种写法挡得住规范的 import os,但挡不住 import os(多空格)、import\tos(制表符)、__import__("os") 这类变体。这就是为什么仓库里的真实实现换成了正则re.compile(r'^\s*import\s+os\b'),多种写法一网打尽),还加了 signal.alarm 超时防死循环。但前面说过,正则+超时依然是进程内教学沙箱,生产环境请上容器隔离。沙箱的安全,永远是个程度问题,不是个有无问题。

💡 清单里没列全。仓库的 tools.py 还提供 list_available_tables(列出所有表)和 get_available_tools(汇总工具清单给 project.py 用),execute_python 还预导入 datetime/collections/itertools/re/random 等更多安全库。


6.4.5 service.py - 服务层

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
"""P04 数据分析 Agent - 服务层"""
from __future__ import annotations

import hashlib
import time

from core.logging_conf import get_logger

from .mcp import execute_query, get_table_schema, list_tables
from .models import AnalysisReport, QueryResult

logger = get_logger("p04.data_analysis.service")
CACHE_TTL = 300
_query_cache: dict[str, tuple[QueryResult, float]] = {}


class DataAnalysisService:
"""数据分析服务。"""

def query_with_cache(self, sql: str, use_cache: bool = True) -> QueryResult:
"""带缓存的查询。"""
cache_key = hashlib.md5(sql.encode()).hexdigest()

if use_cache and cache_key in _query_cache:
result, timestamp = _query_cache[cache_key]
if time.time() - timestamp <= CACHE_TTL:
return result

result = execute_query(sql)
if use_cache and result.success:
_query_cache[cache_key] = (result, time.time())
return result

def get_total_revenue(self, start_date: str = None, end_date: str = None) -> float:
"""快捷方法:查询总销售额。"""
sql = "SELECT SUM(quantity * unit_price) FROM sales"
conditions = []
if start_date:
conditions.append(f"date >= '{start_date}'")
if end_date:
conditions.append(f"date <= '{end_date}'")
if conditions:
sql += " WHERE " + " AND ".join(conditions)

result = self.query_with_cache(sql)
if result.success and result.rows:
return float(result.rows[0][0] or 0)
return 0.0

def get_revenue_by_category(self) -> dict[str, float]:
"""快捷方法:按品类统计销售额。"""
sql = """SELECT category, SUM(quantity * unit_price)
FROM sales GROUP BY category ORDER BY 2 DESC"""
result = self.query_with_cache(sql)
if result.success:
return {row[0]: float(row[1] or 0) for row in result.rows}
return {}

def get_schema_summary(self) -> str:
"""获取数据库结构摘要(用于提示词)。"""
tables = list_tables()
summaries = []
for table in tables:
schema = get_table_schema(table)
summaries.append(f"- **{table}**: {', '.join(schema.get_column_names())}")
return "\n".join(summaries)

核心代码讲解

  • 查询缓存:相同 SQL 在 5 分钟(CACHE_TTL=300)内直接命中缓存,不再碰数据库。老李连问三遍"上个月销冠是谁",数据库只被骚扰一次。
  • DSL 快捷方法get_total_revenueget_revenue_by_category 把常用分析场景封装成现成方法,不依赖 Agent 每次都生成正确的 SQL——重要指标走"快车道",又快又稳。
  • 提示词辅助get_schema_summary() 为 Agent 提供精简的数据库元信息,让模型心里有数再下笔。

💡 清单里 from .mcp import ... list_tablesfrom .models import AnalysisReport 引用的符号,在前面 mcp.py/models.py 的教学清单里没列出来——它们在仓库的真实模块里都有定义(list_tablesmcp.pyAnalysisReportmodels.py)。这是清单简化导致的,不是代码有 bug。仓库的 service.py 还提供 get_revenue_by_regionget_top_productsgenerate_reportclear_cache 等更多方法。


6.4.6 project.py - 项目入口层

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
"""P04 数据分析 Agent - 项目入口层"""
from __future__ import annotations

from langchain.agents import create_agent

from core import BaseProject, build_chat_model, registry
from core.logging_conf import get_logger

from .prompts import build_system_prompt
from .tools import get_available_tools

logger = get_logger("p04.data_analysis.project")


class DataAnalysisProject(BaseProject):
"""数据分析 Agent 项目。"""

id = "p04_data_analysis"
name = "数据分析 Agent"
description = "自然语言查询数据库,自动 SQL + Python 分析。"
capabilities = ["MCP", "SQL", "Python 沙箱", "数据分析"]

def build_agent(self) -> any:
"""构建数据分析 Agent。"""
model = build_chat_model()
tools = get_available_tools()
system_prompt = build_system_prompt()
return create_agent(model=model, tools=tools, system_prompt=system_prompt)


registry.register(DataAnalysisProject())

核心代码讲解

  • 关注点分离:MCP 层负责数据访问,Service 层负责业务逻辑,Project 层负责 Agent 编排——三层互不越界,改一层不连累另两层。
  • 装配即用build_agent() 三行就把模型、工具、提示词凑齐,交给 create_agent 装成一个能跑的 Agent,最后 registry.register 往全局注册表一挂,统一 API 就能调它。
  • MCP 就绪:架构已按 MCP 接口形态预留数据访问层,迁移到标准化协议时,替换 mcp.py 的实现即可,上层 tools/service/project 基本不动——这就是分层带来的红利。

💡 清单里 from .prompts import build_system_prompt 引用的 prompts.py、以及 from .tools import get_available_tools 引用的汇总函数,都没在前面的清单里展示——它们在仓库里都有。prompts.py 负责系统提示词和 SQL 编写指南,get_available_toolstools.py 里把四个工具收成一份清单。仓库的 project.py 还会在构建时拉取 get_schema_summary() 注入提示词,让 Agent 开局就知道数据库长啥样。


6.5 跑一跑:它真的行吗

测试 验证内容
test_get_schema 表结构包含 date/product 等字段
test_query_valid SELECT 查询正常执行
test_query_rejects_dangerous DROP TABLE 被拒绝
test_forbidden_import import os 被沙箱拦截

这四个测试,正好对应前面埋的四道闸:表结构能拿到、正常查询能跑、危险 SQL 被挡、危险 import 被拦。安全功能没测试就是摆设——这四个用例,就是把"我声称安全"变成"我证明安全"。


6.6 送上线:让它上班

同前——继承 BaseProject,自动接入统一 API。


6.7 回头看:学到了什么

能力 在本项目中的体现
MCP 准备 架构预留 MCP Server 接口
SQL 安全 关键词白名单,只允许 SELECT
Python 沙箱 受限 builtins + 关键词检测
自然语言→SQL Agent 先查表结构再写查询

常见坑

  1. 不查表结构直接写 SQL — 列名猜错,查询失败。必须先调 get_table_schema。让模型"先看再写",是少走弯路的关键。
  2. 沙箱不够严格 — 忘记拦截 open() 可能导致文件读取;子串匹配挡不住 import os 这类变体。教学沙箱够演示,生产请上容器隔离。
  3. 误以为已经跑起 MCP Server — 本章是"准备接入",mcp.py 是按 MCP 形态预留的数据访问层,并非真正的 MCP Server。协议层的那一步,留给后续。

📌 项目四完成。 老李终于不用排队等小王了——一句"上个月哪个产品卖得最好",Agent 自己查表、自己写 SQL、自己跑分析、自己出报告。下一章:AI 视频生成 Agent——多模态工具编排。


如果您喜欢此博客或发现它对您有用,则欢迎对此发表评论。 也欢迎您共享此博客,以便更多人可以参与。 如果博客中使用的图像侵犯了您的版权,请与作者联系以将其删除。 谢谢 !